-- compute available slots under worker pool concurrency limits
WITH worker_slots AS (
    SELECT
        wp.id,
        MAX(0, wp.concurrency_limit - count(fr.id)) AS available_slots
    FROM
        work_pool wp
        JOIN work_queue wq ON wq.work_pool_id = wp.id
        LEFT JOIN flow_run fr ON wq.id = fr.work_queue_id
            AND fr.state_type in('RUNNING', 'PENDING')
    WHERE
        wp.is_paused IS FALSE
        AND wp.concurrency_limit IS NOT NULL
    GROUP BY
        wp.id
),


-- compute available slots under worker pool queue concurrency limits
queue_slots AS (
    SELECT
        wq.id,
        MAX(0, wq.concurrency_limit - count(fr.id)) AS available_slots
    FROM
        work_queue wq
    LEFT JOIN flow_run fr ON wq.id = fr.work_queue_id
        AND fr.state_type in('RUNNING', 'PENDING')
WHERE
    wq.is_paused IS FALSE
    AND wq.concurrency_limit IS NOT NULL
GROUP BY
    wq.id
),


-- CTE that loads flow runs and applies worker pool queue limits
scheduled_flow_runs AS (
    SELECT
        wp.id AS run_work_pool_id,
        wq.id AS run_work_queue_id,
        fr.*,
        worker_slots.available_slots as available_worker_slots,
        ROW_NUMBER() OVER (PARTITION BY wp.id ORDER BY {% if respect_queue_priorities %}wq.priority ASC, {% endif %}fr.next_scheduled_start_time) AS work_pool_rank
    FROM
        work_pool wp
        JOIN work_queue wq ON wq.work_pool_id = wp.id
        LEFT JOIN worker_slots ON wp.id = worker_slots.id
        LEFT JOIN queue_slots ON wq.id = queue_slots.id

        JOIN (
            SELECT 
                fr.*,
                ROW_NUMBER() OVER (PARTITION BY work_queue_id ORDER BY next_scheduled_start_time) AS work_queue_rank
            FROM flow_run fr
            WHERE fr.state_type = 'SCHEDULED'
            {% if scheduled_after %}
            AND fr.next_scheduled_start_time >= :scheduled_after
            {% endif %}
            {% if scheduled_before %}
            AND fr.next_scheduled_start_time <= :scheduled_before
            {% endif %}
            ) fr
        ON 
            fr.work_queue_id = wq.id 
            AND work_queue_rank <= MIN(COALESCE(queue_slots.available_slots, :queue_limit), :queue_limit)

    WHERE
        wp.is_paused IS FALSE
        AND wq.is_paused IS FALSE
        {% if work_pool_ids %}
        -- optionally filter for specific worker pool IDs
        AND wp.id IN :work_pool_ids
        {% endif %}                    
        {% if work_queue_ids %}
        -- optionally filter for specific worker pool queue IDs
        AND wq.id IN :work_queue_ids
        {% endif %}
    )

SELECT
    *
FROM scheduled_flow_runs
WHERE
    work_pool_rank <= MIN(COALESCE(available_worker_slots, :worker_limit), :worker_limit)
ORDER BY 
    {% if respect_queue_priorities %}
    
    work_pool_rank ASC,
    {% endif %}
    next_scheduled_start_time ASC
LIMIT :limit